Fix bug with in-process request handling for dag.test#50419
Merged
kaxil merged 1 commit intoapache:mainfrom May 10, 2025
Merged
Fix bug with in-process request handling for dag.test#50419kaxil merged 1 commit intoapache:mainfrom
dag.test#50419kaxil merged 1 commit intoapache:mainfrom
Conversation
Fixes a subtle bug where `SUPERVISOR_COMMS` was incorrectly set during supervisor-side request handling in `InProcessSupervisorComms.send_request()`. When calling the supervisor’s `_handle_request()` (e.g. in response to a connection or variable fetch), some internal logic (like `Variable.get()` or `Connection.get_connection_from_secrets()`) would incorrectly detect that it was running in a Task SDK execution context — because `SUPERVISOR_COMMS` was still set. This led to recursive calls into the SDK flow (e.g., calling `SUPERVISOR_COMMS.lock`) **while already holding the lock**, resulting in `AttributeError: 'NoneType' object has no attribute 'lock'` or deadlocks. - The fix ensures `SUPERVISOR_COMMS` is temporarily unset while handling the request. - This prevents Task SDK context detection logic from activating during supervisor API handling. - The `set_supervisor_comms(None)` context manager is now explicitly used within `send_request()` to guard the call to `_handle_request()`. - Unit tests for `set_supervisor_comms()` covering all override/restore edge cases - A roundtrip test that verifies `send_request()` triggers `_handle_request()`, which in turn uses `send_msg()` to queue a response retrievable via `get_message()` This fixes real bugs encountered when using `dag.test()` in system tests that rely on connections or variables: - Tasks attempting to fetch a connection during execution caused the supervisor to recurse into its own comms path - This led to incorrect error handling (`500 Internal Server Error`) and test failures - Based on debugging a failure in the `example_athena` system test under `dag.test()` - Prevents regressions for other DAGs/tasks that rely on connection or variable fetches inside Task SDK runtime
ashb
approved these changes
May 10, 2025
amoghrajesh
reviewed
May 10, 2025
Contributor
amoghrajesh
left a comment
There was a problem hiding this comment.
Nice. That wasnt easy to find
ayush3singh
pushed a commit
to ayush3singh/airflow
that referenced
this pull request
May 10, 2025
kaxil
added a commit
that referenced
this pull request
May 12, 2025
kaxil
added a commit
that referenced
this pull request
Jun 3, 2025
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes a subtle bug where
SUPERVISOR_COMMSwas incorrectly set during supervisor-side request handling inInProcessSupervisorComms.send_request()-- reported in #50300 (comment) (Based on debugging a failure in theexample_athenasystem test underdag.test()). It also now prevents regressions for other DAGs/tasks that rely on connection or variable fetches inside Task SDK runtimeProblem
When calling the supervisor’s
_handle_request()(e.g. in response to a connection or variable fetch), some internal logic (likeVariable.get()orConnection.get_connection_from_secrets()) would incorrectly detect that it was running in a Task SDK execution context — becauseSUPERVISOR_COMMSwas still set.This led to recursive calls into the SDK flow (e.g., calling
SUPERVISOR_COMMS.lock) while already holding the lock, resulting inAttributeError: 'NoneType' object has no attribute 'lock'or deadlocks.This caused bug when using
dag.test()in system tests that specifically rely on connections or variables:500 Internal Server Error) and test failuresFix
SUPERVISOR_COMMSis temporarily unset while handling the request.set_supervisor_comms(None)context manager is now explicitly used withinsend_request()to guard the call to_handle_request().Why this matters
This fixes real bugs encountered when using
dag.test()in system tests that rely on connections or variables:500 Internal Server Error) and test failuresTo Reproduce
Use following dag on main:
and run:
Which will cause:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.